'use strict';

var Readable = require('stream').Readable;
var streamsOpts = { objectMode: true };

/**
 * In-memory implementation of the message store
 * This can actually be saved into files.
 *
 */
function Store() {
    if (!(this instanceof Store)) {
        return new Store();
    }

    this._inflights = {};
}

/**
 * Adds a packet to the store, a packet is
 * anything that has a messageId property.
 *
 */
Store.prototype.put = function (packet, cb) {
    this._inflights[packet.messageId] = packet;

    if (cb) {
        cb();
    }

    return this;
};

/**
 * Creates a stream with all the packets in the store
 *
 */
Store.prototype.createStream = function () {
    var stream = new Readable(streamsOpts);
    var inflights = this._inflights;
    var ids = Object.keys(this._inflights);
    var destroyed = false;
    var i = 0;

    stream._read = function () {
        if (!destroyed && i < ids.length) {
            this.push(inflights[ids[i++]]);
        } else {
            this.push(null);
        }
    };

    stream.destroy = function () {
        if (destroyed) {
            return;
        }

        var self = this;

        destroyed = true;

        process.nextTick(function () {
            self.emit('close');
        });
    };

    return stream;
};

/**
 * deletes a packet from the store.
 */
Store.prototype.del = function (packet, cb) {
    packet = this._inflights[packet.messageId];
    if (packet) {
        delete this._inflights[packet.messageId];
        cb(null, packet);
    } else if (cb) {
        cb(new Error('missing packet'));
    }

    return this;
};

/**
 * get a packet from the store.
 */
Store.prototype.get = function (packet, cb) {
    packet = this._inflights[packet.messageId];
    if (packet) {
        cb(null, packet);
    } else if (cb) {
        cb(new Error('missing packet'));
    }

    return this;
};

/**
 * Close the store
 */
Store.prototype.close = function (cb) {
    this._inflights = null;
    if (cb) {
        cb();
    }
};

module.exports = Store;
